分类
联系方式
  1. 新浪微博
  2. E-mail

DartVM IsolateMessageHandler

介绍

在 Isolate 中,代码运行在消息队列中,IsolateMessageHandler 就是 Isolate 中的消息队列处理类。

IsolateMessageHandler 继承自 MessageHandler,要理解 IsolateMessageHandler,首先要理解它的父类 MessageHandler

创建

IsolateMessageHandler 的创建位于 Isolate::InitIsolate 方法中,与 IsolateMessageHandler 相关的代码实现如下:

Isolate* result = new Isolate(isolate_group, api_flags);
// ...
MessageHandler* handler = new IsolateMessageHandler(result);
result->set_message_handler(handler);
result->set_main_port(PortMap::CreatePort(result->set_message_handler()));
// ...

HandleMesage 消息处理

MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
    std::unique_ptr<Message> message) {
  Thread* thread = Thread::Current();
  StackZone stack_zone(thread);
  Zone* zone = stack_zone.GetZone();
  HandleScope handle_scope(thread);

  // 如果是普通消息,查找它的 handler,并处理了一些异常情况(省略)
  Object& msg_handler = Object::Handle(zone);
  if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
    msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
    //......
  }

  // 消息解析,省略消息处理过程,省略消息异常报错
  Object& msg_obj = Object::Handle(zone);
  if (message->IsPersistentHandle()) {
    // msg_array = [<message>, <object-in-message-to-rehash>]
    // ... 省略消息处理过程
  } else {
    msg_obj = ReadMessage(thread, message.get());
  }
  
  Instance& msg = Instance::Handle(zone);
  msg ^= msg_obj.ptr(); // Can't use Instance::Cast because may be null.

  MessageStatus status = kOK;
  // 如果是 OOB 控制消息
  if (message->IsOOB()) {
    // OOB 消息都是固定长度的数组,首元素叫 Smi 描述 OOB 目的地
    if (msg.IsArray()) {
      const Array& oob_msg = Array::Cast(msg);
      if (oob_msg.Length() > 0) {
        const Object& oob_tag = Object::Handle(zone, oob_msg.At(0));
        if (oob_tag.IsSmi()) {
          // 走到这里,说明是一个有效的 OOB 消息
          switch (Smi::Cast(oob_tag).Value()) {
            case Message::kServiceOOBMsg: {
              // 这种跟 Isolate 没有关系
              UNREACHABLE();
              break;
            }
            case Message::kIsolateLibOOBMsg: {
              // 跟 Isolate 有关系的是这一种
              const Error& error = Error::Handle(HandleLibMessage(oob_msg));
              if (!error.IsNull()) {
                status = ProcessUnhandledException(error);
              }
              break;
            }
          }
        }
      }
    }
  } else if (message->dest_port() == Message::kIllegalPort) {
    // 在这里处理普通消息
    // 这里还有一个对 OOB 消息的兜底处理,先省略
    if (msg.IsArray()) {
      // ......
    }
  } else {
    // DartLibraryCalls::HandleMessage 是 Isolate 进行进一步消息分发的方法
    // msg_handler 是前面根据 port id 查到的处理器
    // msg 是解析出来的消息
    const Object& result =
        Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
    if (result.IsError()) {
      status = ProcessUnhandledException(Error::Cast(result));
    } else {
      ASSERT(result.IsNull());
    }
  }
  return status;
}

DartLibraryCalls::LookupHandler

在上面方法中,DartLibraryCalls::LookupHandler 用来根据 id 获取对应的 MessageHandler。

DartLibraryCalls::LookupHandler 的实现如下:

ObjectPtr DartLibraryCalls::LookupHandler(Dart_Port port_id) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  const auto& function = Function::Handle(
      zone, thread->isolate_group()->object_store()->lookup_port_handler());
  const int kNumArguments = 1;
  ASSERT(!function.IsNull());
  Array& args = Array::Handle(
      zone, thread->isolate()->isolate_object_store()->dart_args_1());
  if (args.IsNull()) {
    args = Array::New(kNumArguments);
    thread->isolate()->isolate_object_store()->set_dart_args_1(args);
  }
  args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  return result.ptr();
}

实现有点奇怪,都是在用反射从 C 层操作 C++ 层的东西。

从 Integer::Handle(zone, Integer::New(port_id)) 可以看出,这是拿着 port_id 生成了一个 Dart 层的整数

isolate_group()->object_store()->lookup_port_handler() 通过一番查找,这个反射(InvokeFunction)最终调用的是 _RawReceivePortImpl(Dart) 的 _lookupHandler。访问的是 Dart 的 PortMap,这样取出的实际上是 Dart 的 ReceivePort,并返回给了 C 层。

DartLibraryCalls::HandleMessage

代码实现如下:

ObjectPtr DartLibraryCalls::HandleMessage(const Object& handler,
                                          const Instance& message) {
  auto thread = Thread::Current();
  auto zone = thread->zone();
  auto isolate = thread->isolate();
  auto object_store = thread->isolate_group()->object_store();
  const auto& function =
      Function::Handle(zone, object_store->handle_message_function());
  const int kNumArguments = 2;
  ASSERT(!function.IsNull());
  Array& args =
      Array::Handle(zone, isolate->isolate_object_store()->dart_args_2());
  if (args.IsNull()) {
    args = Array::New(kNumArguments);
    isolate->isolate_object_store()->set_dart_args_2(args);
  }
  args.SetAt(0, handler);
  args.SetAt(1, message);
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  ASSERT(result.IsNull() || result.IsError());
  return result.ptr();
}

通过一番查找,handle_message_function() 对应的是 Dart 侧的 sdk/lib/_internal/vm/lib/isolate_patch.dart 的 _handleMessage:

  // Called from the VM to dispatch to the handler.
  @pragma("vm:entry-point", "call")
  static void _handleMessage(Function handler, var message) {
    handler(message);
    _runPendingImmediateCallback();
  }

这里的 Handler 就是传入 Dart PortMap 的 Handler(Dart 实现的),对于 ReceivePort 来说,就是它实现的 StreamController 的 add 方法。